Airflowに関する個人的FAQ
Airflowを触っていて個人的にハマったことなどをFAQ形式でまとめてみました。全然Frequentlyじゃない気がするのはきっと気のせいです。
以下、バージョンは本記事公開時の最新 1.10.2 です。
インストール
インストールが失敗するのですが?
普通に pip install apache-airflow
とすると以下のエラーで失敗します。
RuntimeError: By default one of Airflow's dependencies installs a GPL dependency (unidecode). To avoid this dependency set SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you install or upgrade Airflow. To force installing the GPL version set AIRFLOW_GPL_UNIDECODE
Airflowが依存するライブラリの中にライセンスがGPL 2.0であるunidecodeが入ってしまっていることによるものです。 現状ではインストール時に以下のどちらかの環境変数を設定する必要があります。
SLUGIFY_USES_TEXT_UNIDECODE=yes
- unidecodeの代わりに非GPLである
text-unidecodeを使います。
- AIRFLOW_GPL_UNIDECODE=1.0.23
- GPLが含まれることを承知の上で、unidecodeをインストールします。
なおこの問題はすでに修正されています(pull request)。次期バージョンでは発生しなくなると思われます。
参考
AmazonLinuxだとインストールに失敗するのですが?
AmazonLinux2でインストールすると以下のエラーで失敗します。
Cannot uninstall 'enum34'. It is a distutils installed project and thus we cannot accurately determine which files belong to it which would lead to only a partial uninstall.
pip以外でenumが先にインストールされているのが原因のようです。pipで強制的にenum34を上書きインストールすることで解消します。
sudo pip install --ignnore-installed enum34
参考
それでもやっぱりAmazonLinuxでインストールに失敗するのですが?
gccが無いとか言われたらgccをインストールしてください
sudo yum install gcc
設定
環境によって設定を変えられますか?
環境変数で設定を上書きすることができます。セクション名とキーで AIRFLOW__{SECTION}__{KEY}
のような変数名で指定します。(見にくいですが間のアンダースコアは2つです。)
例えばwebserverのURLは設定ファイルで以下のように指定しますが
[webserver] base_url = http://airflow-hostname:8080
これを環境変数で AIRFLOW__WEBSERVER__BASE_URL=http://dev.airflow-hostname:8080
のように設定することで上書きできます。
参考
workerのログをS3に出力できますか?
設定のcore
セクションで以下のように設定します
[core] remote_logging = True remote_log_conn_id = aws_default remote_base_log_folder = s3://bucket-name/path/to/log
remote_log_conn_id = aws_default
はデフォルトの接続設定で、Airflowを実行しているIAMロールにS3に書き込む権限があればOKです。
タスクが完了すると、UIからはS3に出力されたログを見ることができます。
参考
DAG関連
DAGにパラメータを渡せますか?
UIからはできないようですが、CLIのtrigger_dag
コマンドで実行すれば可能です。
オプション--conf
にJSON形式で指定します。
$ airflow trigger_dag <dag_id> --conf '{"key": "value"}'
confで指定した値をOperatorで利用するには以下の2つの方法があります。
template文字列内で受け取る
BashOperatorの引数bash_command
などJinja templateが使用できるところでは、{{ dag_run.conf['key'] }}
としてconfの値を受け取れます。
BashOperator( bash_command="echo {{ dag_run.conf['key'] }}", )
Jinja templateを使える引数は、Operatorのtemplate_fields
という変数で指定されているものです(例)。
APIリファレンスでは"templated"と書かれています。
context経由で受け取る
Operator内部のcontextという変数にdag_run
が入っています。
例えば
PythonOperatorの場合は、実行する関数python_callable
のキーワード引数としてcontextが渡ってきますので、以下のように使うことができます。
def print_value(**context): print(context['dag_run'].conf['key']) PythonOperator( python_callable=print_value, )
参考
BashOperatorが"TemplateNotFound"とかいうエラーで失敗するのですが?
コマンド文字列の末尾にスペースを入れると正常に実行できる場合があります。
参考
Operatorの中で設定値を使うことはできますか?
例えばあるDAGが失敗した時にSlackに通知したいが、URLを設定値から取得したい、というマニアックな場合を考えます。この場合、template内で conf.get('section', 'key')
のようにして取得することができます。
text = ''':no_entry: {{ dag.dag_id }} {{ run_id }} failed. {{ conf.get("webserver", "base_url") }}/admin/airflow/graph?dag_id={{ dag.dag_id }}&run_id={{ run_id }} ''' fail = SlackAPIPostOperator( task_id='notify_failuer', channel='#notify-channel', token=os.getenv('SLACK_API_TOKEN'), text=text, username='Airflow', default_args=args, dag=dag, )
このほか、templateで使えるものはドキュメントのココにまとまっています。
他のDAGの終了を待ち合わせることはできますか?
ExternalTaskSensorにそのような機能があります。
前のタスクのステータスによってフローを変えることはできますか?
前のタスクのステータスによって、というものは用意されていないようですが、 BranchPythonOperatorというものがあり、 pythonの関数の実行結果によってフローを変えることは可能です。
参考
スケジュール実行したくないのですが?
DAGのパラメータschedule_interval
をNone
にすれば可能です。
なお、default_args
のdictの中で指定しても効果はありませんので注意してください。
default_args = { 'owner': 'airflow', 'start_date': dates.days_ago(1), #'schedule_interval': None, <- コレは効かない } dag = DAG( dag_id='foo', default_args=default_args, schedule_interval=None, # コレ )
参考
失敗したタスクから先を実行し直したいのですが?
可能です。
- UIでは、やり直したいDagRunをGraph Viewで表示 -> やり直したいタスクをクリック -> "Clear"をクリック
- CLIでは
airflow clear -s -e -t task_a
ちなみに、DAGにconfを設定して実行していた場合も大丈夫です。最初に設定したのと同じconfがやり直す際にも設定されます。
参考
まとめ
以上、ほとんど個人的な備忘録ですが、自分がハマったり調査したことのメモとして残しておきます。 今後も何かあれば随時追記される、かもしれません。